Skip to content

feature: Tracking and inspecting historical chunks#28

Merged
tmosleyIII merged 3 commits into
mainfrom
feature/tmosley/update-control-plane-features
Jun 14, 2026
Merged

feature: Tracking and inspecting historical chunks#28
tmosleyIII merged 3 commits into
mainfrom
feature/tmosley/update-control-plane-features

Conversation

@tmosleyIII

Copy link
Copy Markdown
Contributor

This pull request introduces support for tracking and inspecting historical "chunk runs" in the control plane, which represent bounded work units (such as ledger backfill/repair ranges) processed by data-plane components. The changes include new storage and API methods, CLI commands for operator inspection, and documentation for component integration.

Chunk Run Management and API:

  • Added new storage methods and types in internal/storage/interface.go and internal/storage/bolt.go to upsert, retrieve, list, and delete chunk runs, including error handling for not found cases. [1] [2] [3] [4] [5] [6]
  • Implemented UpsertChunkRun, GetChunkRun, and ListChunkRuns RPCs in the control plane server and wrapper (internal/api/control_plane.go). [1] [2]

CLI and Operator Tools:

  • Added a new flowctl chunks CLI command with list and show subcommands for inspecting chunk runs, including filtering and detailed output (cmd/chunks.go).

Component and Pipeline Runner Integration:

  • Ensured that orchestrated components receive the correct environment variables for chunk run reporting, including FLOWCTL_RUN_ID and FLOWCTL_ATTEMPT (internal/orchestrator/process.go, internal/runner/pipeline_runner.go). [1] [2]

Documentation and Testing:

  • Added documentation for the chunk run reporting contract and Go helper usage (docs/component-flowctl-reporting.md).
  • Added a unit test for chunk run upsert, retrieval, and listing in the control plane (internal/api/control_plane_test.go).

These changes provide a foundation for reliable tracking and inspection of historical work units, improving both operator visibility and data-plane component integration.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds first-class support for tracking and inspecting historical “chunk runs” (bounded work units like ledger backfill/repair ranges) end-to-end: protobuf API surface, control-plane RPC implementations, persistent storage, a Go component-side reporter helper, and operator-facing flowctl chunks CLI commands.

Changes:

  • Added protobuf types/enums and RPCs for chunk-run upsert/get/list, plus regenerated Go protobuf + gRPC bindings.
  • Implemented chunk-run storage across BoltDB + in-memory backends, and wired new RPC handlers into the control plane.
  • Introduced a Go reporter helper for components and a new flowctl chunks CLI command group, with accompanying docs and tests.

Reviewed changes

Copilot reviewed 12 out of 14 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
proto/control_plane.proto Defines ChunkRun, ChunkStatus, FailureClass, and chunk-run RPCs.
proto/control_plane.pb.go Regenerated Go protobuf bindings for the new API surface.
proto/control_plane_grpc.pb.go Regenerated Go gRPC bindings, adding chunk-run client/server methods.
pkg/component/reporter.go Adds a component-side reporter helper to register/heartbeat and upsert chunk status.
pkg/component/reporter_test.go Unit tests for env parsing and noop behavior when disabled.
internal/storage/memory.go Adds in-memory chunk-run CRUD + listing with filters/limits.
internal/storage/interface.go Extends storage interface with chunk-run types/methods + not-found error type.
internal/storage/bolt.go Adds BoltDB bucket + chunk-run persistence and list/get/delete operations.
internal/runner/pipeline_runner.go Injects FLOWCTL_RUN_ID / default FLOWCTL_ATTEMPT into component env.
internal/orchestrator/process.go Adds FLOWCTL_HEARTBEAT_INTERVAL_MS injection for process-managed components.
internal/api/control_plane.go Implements UpsertChunkRun, GetChunkRun, ListChunkRuns RPCs and wrapper methods.
internal/api/control_plane_test.go Adds test coverage for chunk-run upsert/get/list behavior.
docs/component-flowctl-reporting.md Documents component reporting contract, Go helper usage, and CLI inspection.
cmd/chunks.go Adds flowctl chunks list/show operator commands with filters and formatted output.
Files not reviewed (2)
  • proto/control_plane.pb.go: Generated file
  • proto/control_plane_grpc.pb.go: Generated file

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread internal/storage/bolt.go Outdated
Comment on lines +583 to +606
count := int32(0)
return b.ForEach(func(k, v []byte) error {
if limit > 0 && count >= limit {
return nil
}

var chunk ChunkRunInfo
if err := json.Unmarshal(v, &chunk); err != nil {
return fmt.Errorf("failed to unmarshal chunk run: %w", err)
}
if pipelineRunID != "" && chunk.Chunk.PipelineRunId != pipelineRunID {
return nil
}
if componentID != "" && chunk.Chunk.ComponentId != componentID {
return nil
}
if status != flowctlpb.ChunkStatus_CHUNK_STATUS_UNKNOWN && chunk.Chunk.Status != status {
return nil
}

chunks = append(chunks, &chunk)
count++
return nil
})
Comment on lines +687 to +693
chunk := req.Chunk
if chunk.Attempt == 0 {
chunk.Attempt = 1
}
if chunk.ChunkId == "" {
chunk.ChunkId = fmt.Sprintf("%s:%s:%d-%d:%d", chunk.PipelineRunId, chunk.ComponentId, chunk.ChunkStart, chunk.ChunkEnd, chunk.Attempt)
}
Comment thread cmd/chunks.go

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 50a2e534bd

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread pkg/component/reporter.go
Comment on lines +276 to +283
_, err := r.ReportChunk(ctx, ChunkUpdate{
ChunkStart: start,
ChunkEnd: end,
Status: status,
CompletedAt: &now,
VerifiedAt: verifiedAt,
RowCounts: rowCounts,
Verification: verification,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve prior chunk fields on completion

When callers follow the documented helper sequence (ReportChunkProgress and then ReportChunkCompleted for the same range), both calls get the same generated chunk id and the server upsert replaces the stored record. This completion update omits fields set by the progress update such as StartedAt, Phase, and Metadata, so the final historical chunk no longer shows when it started or what phase it reached; merge the previous chunk state or carry these fields forward on terminal updates.

Useful? React with 👍 / 👎.

Comment thread pkg/component/reporter.go Outdated
Comment on lines +76 to +79
conn, err := grpc.DialContext(ctx, cfg.Endpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Bound reporter connection attempts

If a component passes a long-lived/root context to NewReporter (as in the new usage docs), grpc.WithBlock() waits until the control-plane endpoint connects or that context is cancelled. With a misconfigured or temporarily unavailable endpoint this hangs component startup indefinitely instead of failing fast, so the dial should use an internal timeout or avoid blocking unless the caller supplied a deadline.

Useful? React with 👍 / 👎.

Comment thread pkg/component/reporter.go
Comment on lines +138 to +140
_, err := r.client.Heartbeat(ctx, &flowctlpb.ServiceHeartbeat{
ServiceId: serviceID,
Metrics: metrics,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Wire reporter heartbeat metrics through the wrapper

These metrics are sent on the flowctlpb heartbeat path, but the registered ControlPlaneWrapper.Heartbeat currently converts the request into a v1 heartbeat with only ServiceId, so values from the new StartHeartbeatLoop (for example ledgers_processed) are silently dropped before the storage merge and never appear in status/monitoring. Convert the metric map in the wrapper or route the reporter through the v1 API.

Useful? React with 👍 / 👎.

@tmosleyIII tmosleyIII merged commit 63cefb0 into main Jun 14, 2026
3 checks passed
@tmosleyIII tmosleyIII deleted the feature/tmosley/update-control-plane-features branch June 14, 2026 15:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants